epoll 与 zeromq 结合

场景:前端用户使用epoll事件,接受用户请求。后端使用zeromq请求后端服务器。

思路:将epoll加入zeromq的poll里面,以监听epoll是否有事件。如果是epoll的事件,则调用epoll_wait 获取事件的fd列表并处理。否则按照zeromq的方式处理。

zeromq 的poll用法参考:http://api.zeromq.org/4-0:zmq-poll 以及例子里面的:mspoller.cpp 这里重点不在epoll中,所以实现有些粗糙,自行加入非阻塞等等。


#include "zmq.hpp"
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
 
#define MAX_EPOLL 50
int mybind(int port)
{
    int sockfd;
    struct sockaddr_in my_addr;
    struct sockaddr_in their_addr;
    int sin_size;
    if((sockfd = socket(AF_INET,SOCK_STREAM,0))==-1) {
        perror("create socket");
        exit(1);
    }
    //初始化结构体,并绑定端口
    my_addr.sin_family = AF_INET;
    my_addr.sin_port = htons(port);
    my_addr.sin_addr.s_addr = INADDR_ANY;
    bzero(&(my_addr.sin_zero),8);
    setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&my_addr,sizeof(my_addr));
    if(bind(sockfd,(struct sockaddr *)&my_addr,sizeof(struct sockaddr))==-1){
        perror("bind port");
        exit(1);
    }
    if(listen(sockfd,10)==-1){
        perror("listen port");
        exit(1);
    }
    return sockfd;
}
 
void process_request(int epfd,int client,zmq::socket_t & request)
{
    char buf[255]={0};
    int len=recv(client,buf,sizeof(buf),0);
    printf("recv data %d\n",client);
    if(len<=0){
        struct epoll_event ev;
        printf("close %d\n",client);
        ev.events=EPOLLIN;
        ev.data.fd=client;
        ev.data.ptr=NULL;
        epoll_ctl(epfd,EPOLL_CTL_DEL,client,&ev);
        close(client);
        return;
    }
    printf("recv %s\n",buf);
    char sfd[32];
    sprintf(sfd,"%d",client);
    zmq::message_t msg_fd(strlen(sfd));
    memcpy(msg_fd.data(),sfd,strlen(sfd));
    zmq::message_t msg_empty;
    zmq::message_t msg_data(len);
    memcpy(msg_data.data(),buf,len);
    request.send(msg_fd,ZMQ_SNDMORE);
    request.send(msg_empty,ZMQ_SNDMORE);
    request.send(msg_data);
}
void process_listen(int epfd,int listen_fd)
{
    struct sockaddr_in their_addr;
    socklen_t sin_size;
    sin_size = sizeof(struct sockaddr_in);
    int client=accept(listen_fd,(struct sockaddr *)&their_addr,&sin_size);
    printf("accept %d\n",client);
    if(client<0){
        perror("accept socket");
        exit(1);
    }
    struct epoll_event ev;
    ev.events=EPOLLIN;
    ev.data.fd=client;
    epoll_ctl(epfd,EPOLL_CTL_ADD,client,&ev);
 
}
 
void process_epoll_in(int epfd,int listen_fd,zmq::socket_t & request)
{
    struct epoll_event evs[MAX_EPOLL]={0};
    int nfds=epoll_wait(epfd,&evs[0],MAX_EPOLL,-1);
    printf("epoll wait %d\n",nfds);
    for(int i=0; i<nfds; i++){
        printf("epoll ev %d %d\n",i,evs[i].data.fd);
        if(listen_fd==evs[i].data.fd){
            process_listen(epfd,listen_fd);
        }else{
            process_request(epfd,evs[i].data.fd,request);
        }
 
    }
 
}
  
int main (int argc, char *argv[])
{
    zmq::context_t context(1);
 
    zmq::socket_t responder(context, ZMQ_REP);
    responder.bind("tcp://*:5560");
 
    zmq::socket_t request(context, ZMQ_DEALER );
    request.connect("tcp://localhost:5560");
    //Initialize epoll
    //
    struct epoll_event ev;
    int epfd=epoll_create(MAX_EPOLL);
    int listen_fd=mybind(5561);
    printf("bind %d\n",listen_fd);
    ev.events=EPOLLIN;
    ev.data.fd=listen_fd;
    epoll_ctl(epfd,EPOLL_CTL_ADD,listen_fd,&ev);
 
    //  Initialize poll set
    zmq::pollitem_t items [] = {
        { responder, 0, ZMQ_POLLIN, 0 },
        { request, 0, ZMQ_POLLIN, 0 },
        { 0, epfd, ZMQ_POLLIN, 0 }
    };
 
    while(1)
    {
        /* Poll for events indefinitely */
        int rc = zmq_poll (items, 3, -1);
        if (items [0].revents & ZMQ_POLLIN) {
            //worker process
            printf("responder recv\n");
            zmq::message_t message;
            responder.recv(&message);
            responder.send(message);
        }
        if (items[1].revents & ZMQ_POLLIN) {
            //request recv msg;
            printf("request recv\n");
            zmq::message_t message[3];
            int index;
            int more=1;
            for(index=0;more && index<3; index++){
                request.recv(&message[index]);
                more=message[index].more();
            }
            if(index>=3){
                int fd=atoi((char*)message[0].data());
                send(fd,message[2].data(),message[2].size(),0);
            }
        }
        if(items[2].revents & ZMQ_POLLIN){
            process_epoll_in(epfd,listen_fd,request);
        }
    }
}
```cpp
humboldt Written by:

humboldt 的趣味程序园